/*
 * Amazon FreeRTOS POSIX Demo V1.4.2
 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates.  All Rights Reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy of
 * this software and associated documentation files (the "Software"), to deal in
 * the Software without restriction, including without limitation the rights to
 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
 * the Software, and to permit persons to whom the Software is furnished to do so,
 * subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 *
 * http://aws.amazon.com/freertos
 * http://www.FreeRTOS.org
 */

/**
  * @brief Demo intro: job distribution with actor model.
  *
  * This demo simulates job distribution with actor model.
  * https://en.wikipedia.org/wiki/Actor_model
  *
  * In this demo, vStartPOSIXDemo() first creates all mailboxes
  * which will be used by actors to send and receive messages.
  * Then it spins up two types of actors -- Dispatcher and Workers.
  *
  * Dispatcher -- Distributing sub-tasks to workers.
  *               Distribution is done by putting messages into each worker's inbox,
  *               which is essentially an mqueue. Dispatcher keeps distributing tasks
  *               until all intended tasks are distributed.
  *
  * Workers -- Take sub-tasks and perform predefined routine for each type of tasks.
  *
  * Upon finishing distributing all tasks, Dispatcher will send a "terminate" message to
  * each worker. vStartPOSIXDemo() will then join all actor threads and clean up mailboxes.
  *
  * @note A few assumptions are made in this demo, which a user might have to alter
  * if to adopt this model in a new application:
  *
  *  - The upper limit for MQUEUE_NUMBER_OF_WORKERS is set to 10.
  *    This is not due to physical constraint (e.g. memory), rather to make queue
  *    names end with a single digit number.
  *
  *  - Message enum is cast to char/uint8_t directly, with the assumption that
  *    the system is not going to have more than 254 messages, which is often true
  *    in practice. Could extend bits used in a message to either have more messages
  *    or include additional arguments for a message. Proper typecasting is needed
  *    in that case.
  *
  *  - The philosophy is "failure is expected". It is shown in both the way dispatcher
  *    delivers messages (i.e. messages can be dropped by worker(s)), and also the
  *    way workers process messages (i.e. workers do not inform dispatcher success or
  *    failure).
  *
  *  - Following the philosophy, dispatcher shall never use blocking calls to distribute
  *    tasks. The only exception made here is that dispatcher needs to make sure the
  *    successful delivery of "terminate" messages. So that, main thread could join
  *    all actor threads and finish the demo.
  */

/* FreeRTOS includes. */
#include "FreeRTOS.h"
#include "task.h"

/* FreeRTOS+POSIX. */
#include "FreeRTOS_POSIX/pthread.h"
#include "FreeRTOS_POSIX/mqueue.h"
#include "FreeRTOS_POSIX/time.h"
#include "FreeRTOS_POSIX/fcntl.h"
#include "FreeRTOS_POSIX/errno.h"
#include "FreeRTOS_POSIX/unistd.h"

/* System headers */
#include <stdbool.h>
#include <string.h>
#include <stdio.h>

/* Demo includes. */
#include "posix_demo.h"

/* Constants */
#define LINE_BREAK "\r\n"

/**
 * @brief Control messages.
 *
 * uint8_t is sufficient for this enum, that we are going to cast to char directly.
 * If ever needed, implement a function to properly typecast.
 */
/**@{ */
typedef enum ControlMessage {
	eMSG_LOWER_INAVLID =
		0x00, /**< Guard, let's not use 0x00 for messages. */
	eWORKER_CTRL_MSG_CONTINUE =
		0x01, /**< Dispatcher to worker, distributing another job. */
	eWORKER_CTRL_MSG_EXIT =
		0x02, /**< Dispatcher to worker, all jobs are finished and the worker receiving such can exit. */

	/* define additional messages here */

	eMSG_UPPER_INVALID =
		0xFF /**< Guard, additional tasks shall be defined above. */
} eControlMessage;
/**@} */

/**
 * @defgroup Configuration constants for the dispatcher-worker demo.
 */
/**@{ */
#define MQUEUE_NUMBER_OF_WORKERS                                               \
	(4) /**< The number of worker threads, each thread has one queue which is used as income box. */

#if (MQUEUE_NUMBER_OF_WORKERS > 10)
#error "Please keep MQUEUE_NUMBER_OF_WORKERS < 10."
#endif

#define MQUEUE_WORKER_QNAME_BASE "/qNode0" /**< Queue name base. */
#define MQUEUE_WORKER_QNAME_BASE_LEN (6) /** Queue name base length. */

#define MQUEUE_TIMEOUT_SECONDS                                                 \
	(1) /**< Relative timeout for mqueue functions. */
#define MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER                                   \
	(1) /**< Maximum number of messages in a queue. */

#define MQUEUE_MSG_WORKER_CTRL_MSG_SIZE                                        \
	sizeof(uint8_t) /**< Control message size. */
#define DEMO_ERROR (-1) /**< Any non-zero value would work. */
/**@} */

/**
 * @brief Structure used by Worker thread.
 */
/**@{ */
typedef struct WorkerThreadResources {
	pthread_t pxID; /**< thread ID. */
	mqd_t xInboxID; /**< mqueue inbox ID. */
} WorkerThreadResources_t;
/**@} */

/**
 * @brief Structure used by Dispatcher thread.
 */
/**@{ */
typedef struct DispatcherThreadResources {
	pthread_t pxID; /**< thread ID. */
	mqd_t *pOutboxID; /**< a list of mqueue outbox ID. */
} DispatcherThreadResources_t;
/**@} */

/*-----------------------------------------------------------*/

static void *prvWorkerThread(void *pvArgs)
{
	WorkerThreadResources_t pArgList = *(WorkerThreadResources_t *)pvArgs;

	printf("Worker thread #[%d] - start %s", (int)pArgList.pxID,
	       LINE_BREAK);

	struct timespec xReceiveTimeout = { 0 };

	ssize_t xMessageSize = 0;
	char pcReceiveBuffer[MQUEUE_MSG_WORKER_CTRL_MSG_SIZE] = { 0 };

	/* This is a worker thread that reacts based on what is sent to its inbox (mqueue). */
	while (true) {
		clock_gettime(CLOCK_REALTIME, &xReceiveTimeout);
		xReceiveTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;

		xMessageSize = mq_receive(pArgList.xInboxID, pcReceiveBuffer,
					  MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, 0);

		/* Parse messages */
		if (xMessageSize == MQUEUE_MSG_WORKER_CTRL_MSG_SIZE) {
			switch ((int)pcReceiveBuffer[0]) {
			case eWORKER_CTRL_MSG_CONTINUE:
				/* Task branch, currently only prints message to screen. */
				/* Could perform tasks here. Could also notify dispatcher upon completion, if desired. */
				printf("Worker thread #[%d] -- Received eWORKER_CTRL_MSG_CONTINUE %s",
				       (int)pArgList.pxID, LINE_BREAK);
				break;

			case eWORKER_CTRL_MSG_EXIT:
				printf("Worker thread #[%d] -- Finished. Exit now. %s",
				       (int)pArgList.pxID, LINE_BREAK);

				return NULL;

			default:
				/* Received a message that we don't care or not defined. */
				break;
			}
		} else {
			/* Invalid message. Error handling can be done here, if desired. */
		}
	}

	/* You should never hit here. */
	/* return NULL; */
}

/*-----------------------------------------------------------*/

static void *prvDispatcherThread(void *pvArgs)
{
	DispatcherThreadResources_t pArgList =
		*(DispatcherThreadResources_t *)pvArgs;

	printf("Dispatcher thread - start %s", LINE_BREAK);

	struct timespec xSendTimeout = { 0 };

	ssize_t xMessageSize = 0;
	char pcSendBuffer[MQUEUE_MSG_WORKER_CTRL_MSG_SIZE] = { 0 };

	/* Just for fun, let threads do a total of 100 independent tasks. */
	int i = 0;
	const int totalNumOfJobsPerThread = 100;

	/* Distribute 1000 independent tasks to workers, in round-robin fashion. */
	pcSendBuffer[0] = (char)eWORKER_CTRL_MSG_CONTINUE;

	for (i = 0; i < totalNumOfJobsPerThread; i++) {
		clock_gettime(CLOCK_REALTIME, &xSendTimeout);
		xSendTimeout.tv_sec += MQUEUE_TIMEOUT_SECONDS;

		printf("Dispatcher iteration #[%d] -- Sending msg to worker thread #[%d]. %s",
		       i, (int)pArgList.pOutboxID[i % MQUEUE_NUMBER_OF_WORKERS],
		       LINE_BREAK);

		xMessageSize = mq_timedsend(
			pArgList.pOutboxID[i % MQUEUE_NUMBER_OF_WORKERS],
			pcSendBuffer, MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, 0,
			&xSendTimeout);

		if (xMessageSize != 0) {
			/* This error is acceptable in our setup.
             * Since inbox for each thread fits only one message.
             * In reality, balance inbox size, message arrival rate, and message drop rate. */
			printf("An acceptable failure -- dispatcher failed to send eWORKER_CTRL_MSG_CONTINUE to outbox ID: %x. errno %d %s",
			       (int)pArgList
				       .pOutboxID[i % MQUEUE_NUMBER_OF_WORKERS],
			       errno, LINE_BREAK);
		}
	}

	/* Control thread is now done with distributing jobs. Tell workers they are done. */
	pcSendBuffer[0] = (char)eWORKER_CTRL_MSG_EXIT;

	for (i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++) {
		printf("Dispatcher [%d] -- Sending eWORKER_CTRL_MSG_EXIT to worker thread #[%d]. %s",
		       i, (int)pArgList.pOutboxID[i % MQUEUE_NUMBER_OF_WORKERS],
		       LINE_BREAK);

		/* This is a blocking call, to guarantee worker thread exits. */
		xMessageSize = mq_send(
			pArgList.pOutboxID[i % MQUEUE_NUMBER_OF_WORKERS],
			pcSendBuffer, MQUEUE_MSG_WORKER_CTRL_MSG_SIZE, 0);
	}

	return NULL;
}

/*-----------------------------------------------------------*/

/**
 * @brief Job distribution with actor model.
 *
 * See the top of this file for detailed description.
 */
void vStartPOSIXDemo(void *pvParameters)
{
	int i = 0;
	int iStatus = 0;

	printf("Run Task:vStartPOSIXDemo\n");
	/* Remove warnings about unused parameters. */
	(void)pvParameters;

	/* Handles of the threads and related resources. */
	DispatcherThreadResources_t pxDispatcher = { 0 };
	WorkerThreadResources_t pxWorkers[MQUEUE_NUMBER_OF_WORKERS] = { { 0 } };
	mqd_t workerMqueues[MQUEUE_NUMBER_OF_WORKERS] = { 0 };

	struct mq_attr xQueueAttributesWorker = {
		.mq_flags = 0,
		.mq_maxmsg = MQUEUE_MAX_NUMBER_OF_MESSAGES_WORKER,
		.mq_msgsize = MQUEUE_MSG_WORKER_CTRL_MSG_SIZE,
		.mq_curmsgs = 0
	};

	pxDispatcher.pOutboxID = workerMqueues;

	/* Create message queues for each worker thread. */
	for (i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++) {
		/* Prepare a unique queue name for each worker. */
		char qName[] = MQUEUE_WORKER_QNAME_BASE;
		qName[MQUEUE_WORKER_QNAME_BASE_LEN - 1] =
			qName[MQUEUE_WORKER_QNAME_BASE_LEN - 1] + i;

		/* Open a queue with --
         * O_CREAT -- create a message queue.
         * O_RDWR -- both receiving and sending messages.
         */
		pxWorkers[i].xInboxID =
			mq_open(qName, O_CREAT | O_RDWR, (mode_t)0,
				&xQueueAttributesWorker);

		if (pxWorkers[i].xInboxID == (mqd_t)-1) {
			printf("Invalid inbox (mqueue) for worker. %s",
			       LINE_BREAK);
			iStatus = DEMO_ERROR;
			break;
		}

		/* Outboxes of dispatcher thread is the inboxes of all worker threads. */
		pxDispatcher.pOutboxID[i] = pxWorkers[i].xInboxID;
	}

	/* Create and start Worker threads. */
	if (iStatus == 0) {
		for (i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++) {
			(void)pthread_create(&(pxWorkers[i].pxID), NULL,
					     prvWorkerThread, &pxWorkers[i]);
		}

		/* Create and start dispatcher thread. */
		(void)pthread_create(&(pxDispatcher.pxID), NULL,
				     prvDispatcherThread, &pxDispatcher);

		/* Actors will do predefined tasks in threads. Current implementation is that
         * dispatcher actor notifies worker actors to terminate upon finishing distributing tasks. */

		/* Wait for worker threads to join. */
		for (i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++) {
			(void)pthread_join(pxWorkers[i].pxID, NULL);
		}

		/* Wait for dispatcher thread to join. */
		(void)pthread_join(pxDispatcher.pxID, NULL);
	}

	/* Close and unlink worker message queues. */
	for (i = 0; i < MQUEUE_NUMBER_OF_WORKERS; i++) {
		char qName[] = MQUEUE_WORKER_QNAME_BASE;
		qName[MQUEUE_WORKER_QNAME_BASE_LEN - 1] =
			qName[MQUEUE_WORKER_QNAME_BASE_LEN - 1] + i;

		if (pxWorkers[i].xInboxID != 0) {
			(void)mq_close(pxWorkers[i].xInboxID);
			(void)mq_unlink(qName);
		}
	}

	/* Have something on console. */
	if (iStatus == 0) {
		printf("All threads finished. %s", LINE_BREAK);
	} else {
		printf("Queues did not get initialized properly. Did not run demo. %s",
		       LINE_BREAK);
	}

	/* This task was created with the native xTaskCreate() API function, so
	must not run off the end of its implementing thread. */
	vTaskDelete(NULL);
}
